// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include <nebula/io/interfaces.h>
#include <turbo/base/macros.h>

namespace nebula::io {

    struct TURBO_EXPORT CacheOptions {
      static constexpr double kDefaultIdealBandwidthUtilizationFrac = 0.9;
      static constexpr int64_t kDefaultMaxIdealRequestSizeMib = 64;

      /// \brief The maximum distance in bytes between two consecutive
      ///   ranges; beyond this value, ranges are not combined
      int64_t hole_size_limit;
      /// \brief The maximum size in bytes of a combined range; if
      ///   combining two consecutive ranges would produce a range of a
      ///   size greater than this, they are not combined
      int64_t range_size_limit;
      /// \brief A lazy cache does not perform any I/O until requested.
      ///   lazy = false: request all byte ranges when PreBuffer or will_need is called.
      ///   lazy = True, prefetch_limit = 0: request merged byte ranges only after the reader
      ///   needs them.
      ///   lazy = True, prefetch_limit = k: prefetch up to k merged byte ranges ahead of the
      ///   range that is currently being read.
      bool lazy;
      /// \brief The maximum number of ranges to be prefetched. This is only used
      ///   for lazy cache to asynchronously read some ranges after reading the target range.
      int64_t prefetch_limit = 0;

      bool operator==(const CacheOptions& other) const {
        return hole_size_limit == other.hole_size_limit &&
               range_size_limit == other.range_size_limit && lazy == other.lazy &&
               prefetch_limit == other.prefetch_limit;
      }

      /// \brief Construct CacheOptions from network storage metrics (e.g. S3).
      ///
      /// \param[in] time_to_first_byte_millis Seek-time or Time-To-First-Byte (TTFB) in
      ///   milliseconds, also called call setup latency of a new read request.
      ///   The value is a positive integer.
      /// \param[in] transfer_bandwidth_mib_per_sec Data transfer Bandwidth (BW) in MiB/sec
      ///   (per connection).
      ///   The value is a positive integer.
      /// \param[in] ideal_bandwidth_utilization_frac transfer bandwidth utilization fraction
      ///   (per connection) to maximize the net data load.
      ///   The value is a positive double precision number less than 1.
      /// \param[in] max_ideal_request_size_mib The maximum single data request size (in MiB)
      ///   to maximize the net data load.
      ///   The value is a positive integer.
      /// \return A new instance of CacheOptions.
      static CacheOptions make_from_network_metrics(
          int64_t time_to_first_byte_millis, int64_t transfer_bandwidth_mib_per_sec,
          double ideal_bandwidth_utilization_frac = kDefaultIdealBandwidthUtilizationFrac,
          int64_t max_ideal_request_size_mib = kDefaultMaxIdealRequestSizeMib);

      static CacheOptions defaults();
      static CacheOptions lazy_defaults();
    };

    namespace internal {

    /// \brief A read cache designed to hide IO latencies when reading.
    ///
    /// This class takes multiple byte ranges that an application expects to read, and
    /// coalesces them into fewer, larger read requests, which benefits performance on some
    /// filesystems, particularly remote ones like Amazon S3. By default, it also issues
    /// these read requests in parallel up front.
    ///
    /// To use:
    /// 1. Cache() the ranges you expect to read in the future. Ideally, these ranges have
    ///    the exact offset and length that will later be read. The cache will combine those
    ///    ranges according to parameters (see constructor).
    ///
    ///    By default, the cache will also start fetching the combined ranges in parallel in
    ///    the background, unless CacheOptions.lazy is set.
    ///
    /// 2. Call WaitFor() to be notified when the given ranges have been read. If
    ///    CacheOptions.lazy is set, I/O will be triggered in the background here instead.
    ///    This can be done in parallel (e.g. if parsing a file, call WaitFor() for each
    ///    chunk of the file that can be parsed in parallel).
    ///
    /// 3. Call read() to retrieve the actual data for the given ranges.
    ///    A synchronous application may skip WaitFor() and just call read() - it will still
    ///    benefit from coalescing and parallel fetching.
    class TURBO_EXPORT ReadRangeCache {
     public:
      static constexpr int64_t kDefaultHoleSizeLimit = 8192;
      static constexpr int64_t kDefaultRangeSizeLimit = 32 * 1024 * 1024;

      /// Construct a read cache with default
      explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx)
          : ReadRangeCache(file, file.get(), std::move(ctx), CacheOptions::defaults()) {}

      /// Construct a read cache with given options
      explicit ReadRangeCache(std::shared_ptr<RandomAccessFile> file, IOContext ctx,
                              CacheOptions options)
          : ReadRangeCache(file, file.get(), std::move(ctx), options) {}

      /// Construct a read cache with an unowned file
      ReadRangeCache(RandomAccessFile* file, IOContext ctx, CacheOptions options)
          : ReadRangeCache(nullptr, file, std::move(ctx), options) {}

      ~ReadRangeCache();

      /// \brief Cache the given ranges in the background.
      ///
      /// The caller must ensure that the ranges do not overlap with each other,
      /// nor with previously cached ranges.  Otherwise, behaviour will be undefined.
      turbo::Status Cache(std::vector<ReadRange> ranges);

      /// \brief read a range previously given to Cache().
      turbo::Result<std::shared_ptr<Buffer>> read(ReadRange range);

      /// \brief Wait until all ranges added so far have been cached.
      Future<> Wait();

      /// \brief Wait until all given ranges have been cached.
      Future<> WaitFor(std::vector<ReadRange> ranges);

     protected:
      struct Impl;
      struct LazyImpl;

      ReadRangeCache(std::shared_ptr<RandomAccessFile> owned_file, RandomAccessFile* file,
                     IOContext ctx, CacheOptions options);

      std::unique_ptr<Impl> impl_;
    };

    }  // namespace internal
}  // namespace nebula::io

